package net.bwie.realtime.jtp.dwd.trade.function;

import com.alibaba.fastjson.JSONObject;
import net.bwie.realtime.jtp.common.utils.HbaseUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import com.alibaba.fastjson.JSON;
import org.apache.flink.configuration.Configuration;

import java.util.Map;

public class LoadProvinceDimMapFunction extends RichMapFunction<String, String> {
    private Map<String, String> cacheMap;
    @Override
    public void open(Configuration parameters) throws Exception {
        cacheMap = HbaseUtil.scanData("dim_base_province", "info", "name");
    }
    @Override
    public String map(String value) throws Exception {
        //  解析数据
        JSONObject jsonObject = JSON.parseObject(value);
        String provinceId = jsonObject.getString("province_id");
        String provinceName = cacheMap.getOrDefault(provinceId, "unknown");
        jsonObject.put("province_name", provinceName);
        return jsonObject.toJSONString();
    }
    @Override
    public void close() throws Exception {
        super.close();
    }
}
